Step Functions + Batchで並列処理するステートマシンを作ってみた

Step Functions + Batchで並列処理するステートマシンを作ってみた

前回に引き続き、Step FunctionsとBatchで並列処理を行うステートマシンを構築してみました。
Clock Icon2021.06.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。AWS事業本部のKyoです。

前回に続き、Step Functions + Batchで科学計算をイメージしつつ、できるだけシンプルなステートマシンを構築してみます。 今回は並列処理を実装するため、「Parallel」というステートを利用します。

何をやるのか

並列計算を行い、最後にそれらの計算結果を入力とした計算を行います。

詳細

前回作成したBatchのジョブ定義 Incrementを今回も利用します。

Incrementは1つの入力ファイルから1つの出力ファイルを生成します。入力ファイルには任意の数字が含まれており、そこに「1」を足したものを出力ファイルに書き込みます。入力・出力ファイルともにS3の任意のパスからダウンロード・アップロードします。

Parallelでの並列計算には2種類のブランチA, Bが存在し、任意の回数のIncrementを行います。Aでは2回、Bでは1回行う設定です。

最後に行うSumは2つの入力ファイルから与えられる数値を合計し、1つの出力ファイルを生成します。これも入力・出力ファイルともにS3の任意のパスからダウンロード・アップロードします。

準備

Sumについて説明します。

2つの入力ファイルから1つの出力ファイルを生成します。入力ファイルには任意の数字が含まれており、それらの和を出力ファイルに書き込みます。スクリプト自体はIncrementとほぼ同じで、計算のロジックとそれに関わるログやファイル取得が多少変化しています。

コンテナをビルド、ECRにプッシュし、Batchのジョブ定義も作成しておきます。

また、ステートマシンで利用する入力ファイルもS3にアップロードします。

Dockerfile

FROM amazonlinux:2

# Install AWS CLI v2
RUN yum install -y unzip less && \
    curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
    unzip awscliv2.zip && \
    ./aws/install

WORKDIR /usr/local/bin
ADD sum.sh sum.sh

CMD [ "sh", "sum.sh" ]

スクリプト

INPUT_FILE_NAME_1=$(basename $INPUT_S3_URI_1)
INPUT_FILE_NAME_2=$(basename $INPUT_S3_URI_2)
OUTPUT_FILE_NAME=$(basename $OUTPUT_S3_URI)

echo "START: $STEP_NAME"

# S3から入力ファイルをダウンロード
aws s3 cp $INPUT_S3_URI_1 $INPUT_FILE_NAME_1  || exit 1
aws s3 cp $INPUT_S3_URI_2 $INPUT_FILE_NAME_2  || exit 1

# 入力ファイル内の数値を表示
input_num_1=`cat $INPUT_FILE_NAME_1`
input_num_2=`cat $INPUT_FILE_NAME_2`
echo "input1: $input_num_1, input2: $input_num_2"

# 計算
output_num=$(( $input_num_1 + $input_num_2 ))
echo "output: $output_num"

# 計算結果を出力ファイルへ
echo $output_num >$OUTPUT_FILE_NAME

# S3へファイルをアップロード
aws s3 cp $OUTPUT_FILE_NAME $OUTPUT_S3_URI || exit 1

echo "COMPLETE: $STEP_NAME"

ジョブ定義

  • Fargate起動タイプ
  • 1 vCPU, 2048 メモリ
  • ジョブロールとして、「S3FullAccess」ポリシーをもったロール

ステートマシンで利用する入力ファイル

以下の2種類のファイルをS3にアップロードしておきました。パスは任意ですが、後ほど利用するのでメモしておきます。

  • A_in.txt
    • ファイルの中身は「2」
  • B_in.txt
    • ファイルの中身は「1」

やってみる

ステートマシン

今回もVSCode + AWS Toolkitを利用しています。

Parallelでのブランチはネストする形で表現されていますね。また、各ブランチの終わりにはEnd: trueが宣言されています。

Comment: >-
  Second state machine.
StartAt: Parallel
States:      
  Parallel:
    Type: Parallel
    ResultPath: null
    Next: Sum
    Branches:
      - StartAt: A_1st_Increment
        States:
            A_1st_Increment:
                Type: Task
                Resource: 'arn:aws:states:::batch:submitJob.sync'
                ResultPath: null
                Parameters:
                    JobName.$: $$.State.Name
                    JobDefinition.$: $.params.jobdefs.increment
                    JobQueue.$: $.params.queue
                    ContainerOverrides:
                        Environment:
                            -   Name: STEP_NAME
                                Value.$: $$.State.Name
                            -   Name: INPUT_S3_URI
                                Value.$: $.params.environment.A_INPUT_S3_URI
                            -   Name: OUTPUT_S3_URI
                                Value.$: $.params.environment.A_INTERMEDIATE_S3_URI_1
                        Command:
                            - 'sh'
                            - 'increment.sh'
                Next: A_2nd_Increment
            A_2nd_Increment:
                Type: Task
                Resource: arn:aws:states:::batch:submitJob.sync
                ResultPath: null
                Parameters:
                    JobName.$: $$.State.Name
                    JobDefinition.$: $.params.jobdefs.increment
                    JobQueue.$: $.params.queue
                    ContainerOverrides:
                        Environment:
                            -   Name: STEP_NAME
                                Value.$: $$.State.Name
                            -   Name: INPUT_S3_URIA
                                Value.$: $.params.environment.A_INTERMEDIATE_S3_URI_1
                            -   Name: OUTPUT_S3_URI
                                Value.$: $.params.environment.A_OUTPUT_S3_URI
                        Command:
                            - 'sh'
                            - 'increment.sh'
                End: true
      - StartAt: B_1st_Increment
        States:
            B_1st_Increment:
                Type: Task
                Resource: arn:aws:states:::batch:submitJob.sync
                ResultPath: null
                Parameters:
                    JobName.$: $$.State.Name
                    JobDefinition.$: $.params.jobdefs.increment
                    JobQueue.$: $.params.queue
                    ContainerOverrides:
                        Environment:
                            -   Name: STEP_NAME
                                Value.$: $$.State.Name
                            -   Name: INPUT_S3_URI
                                Value.$: $.params.environment.B_INPUT_S3_URI
                            -   Name: OUTPUT_S3_URI
                                Value.$: $.params.environment.B_OUTPUT_S3_URI
                        Command:
                            - 'sh'
                            - 'increment.sh'
                End: true
  Sum:
    Type: Task
    Resource: arn:aws:states:::batch:submitJob.sync
    ResultPath: null
    Parameters:
        JobName.$: $$.State.Name
        JobDefinition.$: $.params.jobdefs.sum
        JobQueue.$: $.params.queue
        ContainerOverrides:
            Environment:
                -   Name: STEP_NAME
                    Value.$: $$.State.Name
                -   Name: INPUT_S3_URI_1
                    Value.$: $.params.environment.A_OUTPUT_S3_URI
                -   Name: INPUT_S3_URI_2
                    Value.$: $.params.environment.B_OUTPUT_S3_URI
                -   Name: OUTPUT_S3_URI
                    Value.$: $.params.environment.SUM_OUTPUT_S3_URI
            Command:
                - 'sh'
                - 'sum.sh'
    End: true

入力

以下のようなJSONを入力とします(値はダミーです)。paramsで 利用するキューとジョブ定義を、environmentで各ブランチの入出力を定義しています。A_INPUT_S3_URIおよびB_INPUT_S3_URIは先程アップロードした入力ファイルのURIです。A_INTERMEDIATE_S3_URI_1はブランチAの計算途中(A_1st_Increment)の結果をアップロードするURIです。

{
    "params": {
        "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu",
        "jobdefs": {
            "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/increment-job-def:1",
            "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1"
        },
      "environment": {
            "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt",
            "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt",
            "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt",
            "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt",
            "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt",
            "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt"
        }
    }
}

結果

無事にステートマシンの実行が終わりました。

A, B両ブランチの計算結果であるA_OUTPUT_S3_URI, B_OUTPUT_S3_URIを見てみるとそれぞれ、「4」, 「2」でした。さらにSUMの結果であるSUM_OUTPUT_S3_URIは「6」 でした。A, B両ブランチの和になっているので、想定どおりの動きですね。

少しハマったところ

以下のエラーメッセージに遭遇しました(上記コードでは対応済み)。

The JSONPath '$.params.jobdefs.sum' specified for the field 'JobDefinition.$' could not be found in the input

原因はParalleの出力が配列であることでした。今回はブランチが2つだったので、要素が2つのJSON配列です。

[
  {
    "params": {
      "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu",
      "jobdefs": {
        "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/my-calc-job-def:1",
        "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1"
      },
      "environment": {
        "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt",
        "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt",
        "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt",
        "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt",
        "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt",
        "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt"
      }
    }
  },
  {
    "params": {
      "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu",
      "jobdefs": {
        "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/my-calc-job-def:1",
        "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1"
      },
      "environment": {
        "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt",
        "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt",
        "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt",
        "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt",
        "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt",
        "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt"
      }
    }
  }
]

対応

2種類の対応があります。

  1. ParalleでResultPath: nullを指定する
  2. JSONPath式を利用する

1.はParalleの出力を利用しないという設定です。ブランチの中ではなく、Paralleに設定する必要があることに注意してください。 2.はParalleの出力から必要な部分を抜いてくるイメージです。

1つめの要素からSumのジョブ定義Arnを取得するなら、以下のようになります。

$.[0].params.jobdefs.sum

JSONPathの操作はKinesisのドキュメントにJSONPathの操作の操作がまとまっていたので参考になりました。また、StepFunctionsのデータフローシミュレーターを使って、実際の入力に対してどのようなJSONPath式で欲しいデータが取得できるのか試すこともできます。

今回は後続の処理がなかったので、1. のParalleでResultPath: nullで対応しています。

おわりに

今回はStep Functions + Batchを利用して並列処理および、その結果をまとめる処理を実装してみました。

本エントリではParallelを1つ利用しましたが、複数利用することでより複雑なステートマシンも組めるようになります。

以上、なにかのお役に立てれば幸いです。

あわせて読みたい

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.